Forecasting Bicycle Traffic in Cologne

forecasting
open-data
geo-mapping
cross-validation
ML
Python
Author

Fabian Rosenthal

Published

December 1, 2024

A reliable forecast of the bicycle traffic is an important indicator for planning and optimizing the infrastructure of a city. In this post, I will predict the bicycle traffic in Cologne based on counting data from local bicycle counting stations.

This project was originally part of the course Machine Learning and Scientific Computing at TH Köln and I worked on it in collaboration with my friend and collegue Felix Otte. While Felix supervised the model trainings and the evaluation of the models, I was responsible for data cleaning, exploration, and visualization. For this writeup, I performed re-runs of the original trainings and added a new class-based implementation of the whole pipeline that is closer to something useful in production. As a newly added feature, I also included an implementation of prediction intervals using the mapie library.

This projects showcases the following skills:

This post is reproducible. Just follow along and execute the code chunks locally.

Introduction

To make cycling in a bigger city more attractive and safe, the infrastructure has to keep up with the demand of it’s users. The city of Cologne has installed several bicycle counting stations to monitor the traffic. The data is available on the open data portal of the city. As a former bike messenger, I directly sympathized with this data set and found it fun to work with. However, the data quality is not perfect and the data is not consistent across the different stations. So we have to be a bit careful at times.

This project showcases forecasting future bicycle demands by using an XGBoost machine learning model that is trained on the timeseries of former counts as well as weather data. We will train specific models, that are specialized on predicting the counts for a single location as well as fallback/baseline versions, that can be used to predict demands in new locations. With this technique we can hopefully make accurate predictions for both longer existing stations as well as new ones, that are opened at the time of the prediction.

Code
import logging
import math
import os
import pickle
import json
import joblib
import zipfile
from datetime import datetime
from pathlib import Path
from pprint import pprint

import geopandas as gpd
import humanize
import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import plotly.express as px
import plotly.graph_objs as go
import polars as pl
import polars.selectors as cs
import requests
# import shap

from adjustText import adjust_text
from geopy.geocoders import Nominatim
from great_tables import GT
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.metrics import (mean_absolute_percentage_error,
                             root_mean_squared_error)
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import (FunctionTransformer, MinMaxScaler,
                                   OneHotEncoder, SplineTransformer)
from xgboost import XGBRegressor

from lookup import months_lookup, names_lookup
from postcodes import postcodes

Data Loading

Package-wise we will use polars and pandas for data manipulation, optuna for hyperparameter optimization, and sklearn/XGBoost for the machine learning part. Let’s also define a function, that downloads the bicycle counting data from the open data portal of the city of Cologne. As the tables are not really consistent, we need to take care of the following cleaning steps:

  1. Ensure the German is correctly encoded. We have to rename them, because the encoding is not consistent.
  2. Replace the month names with integers to make conversion to pl.Date easier.
  3. Extract the year from the file name and add it as a column.
  4. Convert year and month to a pl.Date column.
  5. Remove rows that include yearly sums.
  6. Remove columns that have too few data points.
  7. Unpivot the data to have a long format.
  8. Cast the count column to pl.Int64.
  9. Drop rows with missing values. Some files even have blank rows in between.
Code
def read_in(url):
    expr = pl.col("month")
    for old, new in months_lookup.items():
        expr = expr.str.replace_all(old, new)

    df = pd.read_csv(
        url,
        encoding='latin1',
        thousands=".",
        decimal=",",
        sep=";", 
        index_col=False,
    )

    df.rename(columns={ df.columns[0]: "month" }, inplace = True)
    
    return (
        pl.DataFrame(df)
        # .filter(~pl.all_horizontal(pl.all().is_null()))
        .rename(names_lookup, strict=False)
        .filter(pl.col("month") != "Jahressumme")
        .with_columns(month = pl.col("month").str.replace(r"\d+", "").str.replace(" ", ""))
        .with_columns(expr)
        .with_columns(date = pl.date(pl.lit(int(url[-9:-5])), pl.col("month"), 1))
        # .drop("month", "Hohe Pforte", "Neusser Straße", strict=False)
        .unpivot(cs.numeric(), index="date", variable_name="location", value_name="count")
        .cast({"count": pl.Int64})
        .drop_nulls()
    )

def load_tables(urls):
    return (
        pl.concat([read_in(url) for url in urls], rechunk=True)
        .with_columns(
            location = pl.col("location").cast(pl.Categorical),
            # quarter = pl.col("date").dt.quarter(),
            # month = pl.col("date").dt.month(),
            # days_from_start = (pl.col("date") - pl.col("date").min()).dt.total_days(),
            # sin_month = (pl.col("date").dt.month() * 2 * math.pi / 24).sin()
            )
    )

def get_yearly_sums(df):
    return (
        df
        .with_columns(year = pl.col("date").dt.year())
        .group_by("location", "year")
        .agg(yearly_sum = pl.sum("count"))
        .group_by("location")
        .agg(yearly_sum_avg = pl.col("yearly_sum").mean().cast(pl.Int64))
        .sort("yearly_sum_avg", descending=True)
    )

We can use this in a list comprehension to download and concatenate all .csv files from the portal. The links are stored in a text file. So we will read it’s lines first. The second part includes some feature engineering, that might get moved later on.

Code
working_dir_path = Path(".")
links_file = working_dir_path / "links/links.txt"

with open(links_file, "r") as f:
    df = load_tables(f.readlines())

yearly_sums = get_yearly_sums(df)

Let’s have a look at the start dates of each location as well as the average yearly sums of bicycle counted at each location.

Code
GT(
    (
        df
        .group_by("location")
        .agg(pl.min("date").dt.strftime("%Y-%m").alias("active_since"))
        .join(yearly_sums, on="location")
        .sort("active_since")
    )
).cols_label(
    location="Location",
    active_since="Active Since",
    yearly_sum_avg="Avg. Yearly Sum"
)
Location Active Since Avg. Yearly Sum
Zülpicher Straße 2009-01 1553727
Neumarkt 2009-01 1462115
Deutzer Brücke 2009-01 1379376
Hohenzollernbrücke 2009-01 784299
Bonner Straße 2011-04 853279
Venloer Straße 2014-07 1740725
A.-Silbermann-Weg 2016-01 1050447
Stadtwald 2016-01 789168
Niederländer Ufer 2016-01 781918
A.-Schütte-Allee 2016-01 654382
Vorgebirgspark 2016-01 281671
Vorgebirgswall 2018-06 962966
Universitätsstraße 2020-04 1441397
Rodenkirchener Brücke 2021-01 603411
Severinsbrücke 2021-01 540510
Neusser Straße 2021-08 829417
Hohe Pforte 2022-01 1105224
Code
fig, ax = plt.subplots(figsize=(8, 10))

# create lollipop chart
ax.hlines(y=yearly_sums["location"], xmin=0, xmax=yearly_sums["yearly_sum_avg"], color='navy')
ax.plot(yearly_sums["yearly_sum_avg"], yearly_sums["location"], "o", markersize=8, color='navy')

# set plot title and axis labels
ax.set_title('Avg. Yearly Sums')
ax.set_xlabel('Bicycles Counted in a Year')
ax.set_ylabel('Location')

# set y-axis limits and invert the y-axis
ax.set_ylim(ax.get_ylim()[::-1])

# hide top and right spines
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)

# # add annotations for the counts
for i, (count, loc) in enumerate(zip(yearly_sums["yearly_sum_avg"], yearly_sums["location"])):
    ax.annotate(f"{int(count):}", xy=(count + 50000, i), va='center')

# x-axis ticks
ticks = [0, 500_000, 1_000_000, 1_500_000, 2_000_000]
ticks_labels = ["0", "500k", "1M", "1.5M", "2M"]
ax.set_xticks(ticks)
ax.set_xticklabels(ticks_labels, fontsize=12)
ax.grid(False)

plt.tight_layout()
plt.show()

Spotlight “Zülpicher Straße”

Zülpicher Straße is one of the first locations equipped with a counter, and counts the second most bicycles evcery year. We will plot the data with plotly to make it interactive. A trendline helps understand the general development of the counts.

Code
zuelpicher = (
    df
    .filter(pl.col("location") == "Zülpicher Straße").sort("date")
    .to_pandas()
)

fig = px.scatter(
    zuelpicher, x="date", y="count", 
    title="Counts at Location Zülpicher Straße", 
    trendline="ols", 
    trendline_scope="overall",
    template="simple_white")
fig.add_scatter(x=zuelpicher["date"], y=zuelpicher["count"].rolling(1).mean(), mode='lines', name="Timeseries")
fig.update_xaxes(title_text='Date')
fig.update_yaxes(title_text='Bicycles Counted')
fig.show()

Draw Locations on a Map

Scrape the Geolocation Data

In order to draw the counting stations on a map, we need their locations. Since open data Cologne does not provide this information, we will use the geopy library to scrape the data from OpenStreetMap—specifically, we use the Nominatim geocoder. We will store the data in a dictionary and convert it to a pd.DataFrame for better handling. The call geolocator.geocode(loc + ", Köln, Germany") makes a request to the Nominatim API and returns the location of the adress. So this is only approximate, as the exact counter location may differ from the address.

Code
geolocator = Nominatim(user_agent="myapp")

coordinates = {}
for loc in yearly_sums["location"].unique():
    postcode = postcodes.get(loc, "")
    tmp = geolocator.geocode(loc + f",{postcode} Köln, Germany")
    coordinates[loc] = {
        "latitude": tmp.latitude,
        "longitude": tmp.longitude
    }

coord_df = pd.DataFrame(coordinates).T.reset_index().rename(columns={"index": "location"})

counter_geo = pd.DataFrame(yearly_sums, columns=["location", "yearly_sums_avg"]).merge(coord_df, on="location")
counter_geo["yearly_sums_avg"] = counter_geo["yearly_sums_avg"].astype("float64")
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Neumarkt%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Bonner+Stra%C3%9Fe%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Venloer+Stra%C3%9Fe%2C50823+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=A.-Sch%C3%BCtte-Allee%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=A.-Sch%C3%BCtte-Allee%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=A.-Silbermann-Weg%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=A.-Silbermann-Weg%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Rodenkirchener+Br%C3%BCcke%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Severinsbr%C3%BCcke%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Neusser+Stra%C3%9Fe%2C50733+K%C3%B6ln%2C+Germany&format=json&limit=1

Plotting

Now let’s put everything together and make an interactive plot. We will use plotly to create a scattermapbox plot and a slider to switch between the dates.

Code
coord_pl = pl.DataFrame(coord_df)
lon_mean = coord_pl["longitude"].mean()
lat_mean = coord_pl["latitude"].mean()

date_geo_df = (
    df
    .join(
        coord_pl.with_columns(pl.col("location").cast(pl.Categorical)),
        on="location", 
        how="left"
        )
    .with_columns(
        norm_count = 25 * pl.col("count") / pl.col("count").max(),
        human_count = pl.col("count").map_elements(lambda s: humanize.intword(s), return_dtype=pl.String)
    )
    .select(pl.exclude("month", "quarter", "days_from_start", "sin_month"))
    .sort("date")
)

# csv_file = Path("data") / "date_geo.csv"
# if csv_file.exists():
#     date_geo_df.write_csv(csv_file)

n_dates = len(date_geo_df["date"].unique())

fig = go.Figure()
mask = [False] * n_dates

steps = []
for i, (date, data) in enumerate(date_geo_df.group_by("date")):

    date = date[0].strftime("%Y-%m")
    mask = [j == i for j in range(n_dates)]

    trace = go.Scattermapbox(
        lon=data["longitude"],
        lat=data["latitude"],
        mode='markers',
        marker=dict(
            color='darkred',
            size=data['norm_count'],
            sizemode='area',
            sizeref=0.025,
            sizemin=5
        ),
        hoverinfo='text',
        hovertext=[f"Location: {loc}<br>Date: {date}<br>Count: {val}" 
                   for loc, val in zip(data['location'], data['human_count'])],
        visible=(i == 0),  # Only first trace visible by default
        showlegend=False
    )
    fig.add_trace(trace)
    
    # Create slider step
    step = dict(
        method='update',
        args=[{'visible': mask},
              {'title': f'Cologne Bicycle Counter - {date}'}],
        label=date
    )
    steps.append(step)

# Configure slider
sliders = [dict(
    active=0,
    currentvalue={"prefix": "Date: "},
    pad={"t": 10},
    ticklen=0,
    minorticklen=0,
    transition={
        'duration': 10000,
        'easing': 'circle-in-out'},
    steps=steps
)]

# Update layout
fig.update_layout(
    mapbox_style="carto-positron", #"open-street-map",
    mapbox=dict(
        center=dict(
            lon=lon_mean, 
            lat=lat_mean
        ),
        zoom=11
    ),
    title='Cologne Bicycle Counter',
    height=600,
    width=600,
    margin={"r":0,"t":50,"l":0,"b":0},
    sliders=sliders
)

# Show the figure
fig.show()

Incorporating Weather Features

We will scrape the weather data from the web and extract the files necessary. The data is then loaded into a pl.DataFrame and cleaned. We mainly do some renaming and type casting. Lastly, we deselect some columns that are not needed.

Code
weather_url = Path("links/weather.txt").read_text()
data_path = Path("data")
zip_path = data_path / "weather.zip"
data_path.mkdir(exist_ok=True)

response = requests.get(weather_url)
with open(zip_path, 'wb') as file:
    file.write(response.content)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(data_path)

weather_csv = [os.path.join(data_path, f) for f in os.listdir(data_path) if 'produkt' in f][0]

weather_df = (
    pl.read_csv(
        weather_csv, 
        separator=";", 
        has_header=True,
        schema=pl.Schema(
            {
                "station_id": pl.Int64,
                "date_begin": pl.String,
                "date_end": pl.Int64,
                "qn_4": pl.Int64,
                "sky_cov": pl.Float64,
                "air_temp_daymean_month_mean": pl.Float64,
                "air_temp_daymax_month_mean": pl.Float64,
                "air_temp_daymin_month_mean": pl.Float64,
                "air_temp_daymax_month_max": pl.Float64,
                "air_temp_daymin_month_min": pl.Float64,
                "wind_speed_month_mean": pl.Float64,  # MO_FK
                "wind_speed_daymax_month_max": pl.Float64,  # MX_FX
                "sunshine_duration": pl.Float64,  # MO_SD_S
                "QN_6": pl.Int64,
                "precipitation_month_sum": pl.Float64,
                "precipitation_daymax_month_max": pl.Float64,
                "eor": pl.String
            }
        )
        )
        .with_columns(
            date = pl.col("date_begin").str.to_date(format="%Y%m%d")
        )
        .select(pl.col("date"), cs.numeric())
        .select(cs.exclude("qn_4", "eor", "QN_6", "station_id", "date_end"))
)

GT(weather_df.sample(10))
date sky_cov air_temp_daymean_month_mean air_temp_daymax_month_mean air_temp_daymin_month_mean air_temp_daymax_month_max air_temp_daymin_month_min wind_speed_month_mean wind_speed_daymax_month_max sunshine_duration precipitation_month_sum precipitation_daymax_month_max
1969-12-01 6.57 -1.48 0.7 -4.48 2.3 6.2 -999.0 -10.8 32.0 22.5 6.2
2019-03-01 5.62 7.99 12.67 2.99 3.06 20.4 28.8 -2.8 98.7 74.7 18.2
2015-11-01 6.32 9.47 12.53 6.19 2.63 18.8 21.3 -2.8 46.1 88.0 11.4
1967-09-01 5.33 14.63 19.81 9.99 2.0 27.6 -999.0 4.4 136.7 62.0 17.6
1972-04-01 5.83 8.43 12.65 3.96 2.57 22.1 18.0 -1.9 118.5 47.5 6.3
1988-03-01 7.02 4.93 8.07 1.87 2.55 14.4 23.7 -5.8 48.7 159.6 17.1
1963-05-01 5.9 12.63 17.43 7.1 2.32 27.1 -999.0 0.0 153.6 65.7 15.8
1962-12-01 5.38 -0.57 3.21 -4.0 2.57 12.0 -999.0 -13.7 73.8 97.7 17.3
2005-10-01 3.74 13.27 18.55 8.96 2.35 23.3 13.5 3.1 192.9 51.0 15.4
2018-03-01 6.23 5.0 9.47 0.65 2.84 16.1 19.4 -7.3 105.7 69.7 11.5
Code
weather_detail = (
    weather_df
    .filter(pl.col("date") >= zuelpicher["date"].min())
    .filter(pl.col("date") <= zuelpicher["date"].max())
    .to_pandas()
)

fig = px.scatter(
    weather_detail, x="date", y="air_temp_daymean_month_mean", 
    title="Daily Average Air Temperature (Monthly Mean) vs. Date", 
    trendline="ols", 
    trendline_scope="overall",
    template="simple_white")
fig.add_scatter(x=weather_detail["date"], y=weather_detail["air_temp_daymean_month_mean"].rolling(1).mean(), mode='lines', name="Timeseries")
# axes titles
fig.update_xaxes(title_text='Date')
fig.update_yaxes(title_text='Air Temp. [°C]')
fig.show()

Merging the Data

We merge the weather data with the bicycle counts. We will use a left join, because we want to keep all the bicycle counts. In a second step, we will split the data into a training and a test set. The data from year 2022 will be used for testing, only. We can split the data by filtering the date column.

Code
ml_data = df.join(weather_df, on="date", how="left")
train_data = ml_data.filter(pl.col("date") < datetime(2022, 1, 1))
test_data = ml_data.filter(pl.col("date") >= datetime(2022, 1, 1))
train_data.write_csv("data/train_data.csv")

Machine Learning

Next up is the fun part: Actual Machine Learning! In this project, we will use the XGBoost regression model. We will use Optuna to sample hyperparameters and cross-validate them with the TimeSeriesSplit.

Feature Engineering

Let’s define some sklearn transformers to perform feature engineering. We will create a ColumnTransformer that will be used in a Pipeline.

Code
def make_feature_transformer(X):
    numerical_columns = X.select_dtypes(include=np.number).columns.tolist()
    categorical_columns = X.select_dtypes(include=['object', 'category']).columns.tolist()

    spline_pipeline = Pipeline(
        steps=[
            ('month_transformer', MonthTransformer()),
            ('cyclic_hour', periodic_spline_transformer(12, n_splines=6))
        ]
    )

    sin_pipeline = Pipeline(
        steps=[
            ('month_transformer', MonthTransformer()),
            ('sine_transformer', SinusTransformer())
        ]
    )

    col_transformer = ColumnTransformer(
        transformers=[
            ('quarter_transformer', QuarterTransformer(), ['date']),
            ('month_transformer', MonthTransformer(), ['date']),
            ('sin_transformer', sin_pipeline, ['date']),
            ('spline_transformer', spline_pipeline, ['date']),
            ('num_scale', MinMaxScaler(), numerical_columns),
            ('cat_encode', OneHotEncoder(handle_unknown='ignore'), categorical_columns)
        ],
        remainder="passthrough",
        verbose_feature_names_out=False
    )
    return col_transformer
)

Tuning objective

Next we will define a custom objective function, that we want to minimize. The function will be called by Optuna and will return the negative RMSE of the cross-validated model. We will use the XGBRegressor from XGBoost as the model and the transformer as shown in the code block above.

Code
def objective(trial, X_train, y_train):
    col_transformer = make_feature_transformer(X_train)

    xgbr = XGBRegressor(
            n_estimators=500,
            learning_rate=trial.suggest_float("eta", 0.0001, 1),
            gamma=trial.suggest_int('gamma', 0, 1000),
            max_depth=trial.suggest_int("max_depth", 1, 50),
            min_child_weight=trial.suggest_int('min_child_weight', 0, 100),
            max_delta_step=trial.suggest_int('max_delta_step', 0, 100),
            subsample=trial.suggest_float('subsample', 0, 1),
            colsample_bytree=trial.suggest_float('colsample_bytree', 0, 1),
            reg_alpha=trial.suggest_int('reg_alpha', 0, 1000),
            reg_lambda=trial.suggest_int('reg_lambda', 0, 1000),
            enable_categorical=True,
            random_state=42,
        )

    pipeline = make_pipeline(col_transformer, xgbr)

    cv_score = cross_val_score(
        pipeline,
        X=X_train,
        y=y_train,
        cv=ts_cv,
        scoring="neg_root_mean_squared_error",
    )

    return -cv_score.mean()

The Predictor Class

In order to fit and predict for multiple counter locations, we will have to write a bit of custom OOP code. Due to the data structure, we want to map special models which are specialized on predicing the bicycle counts for a single location to their data. We will define a class SimpleModel that will hold the models. The class will have methods for tuning, refitting, and predicting. “Simple” refers to the fact that we are predicting single values. We can inherit from this class to extend it with prediction intervals later on.

Code
class SimpleModel():
    def __init__(self, fixed_params=None, cache_dir='optimization_cache'):
        if fixed_params is None:
            fixed_params = {
                "n_estimators": 500,
                "enable_categorical": True,
                "random_state": 42
            }
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
        self.fixed_params = fixed_params
        self.is_fitted = False

    def set_objective(self, objective):
        self.objective = objective

    def set_fixed_params(self, fixed_params):
        self.fixed_params = fixed_params

    def set_studies(self, studies):
        self.studies = studies

    def _make_writable_string(self, location):
        return (
            location
            .lower()
            .replace(".", "")
            .replace(" ", "_")
            .replace("ü", "ue")
            .replace("ö", "oe")
            .replace("ä", "ae")
            .replace("ß", "ss")
        )

    def _get_cache_path(self, location):
        """Generate a cache file path for a specific location."""
        return os.path.join(self.cache_dir, f"{location}_study_cache.json")
    
    def _save_study_cache(self, location, study_data):
        """Save study results to a JSON cache file."""
        cache_path = self._get_cache_path(location)
        with open(cache_path, 'w') as f:
            json.dump(study_data, f)
    
    def _load_study_cache(self, location):
        """Load cached study results for a location."""
        cache_path = self._get_cache_path(location)
        if os.path.exists(cache_path):
            with open(cache_path, 'r') as f:
                return json.load(f)
        return None

    def _save_failed_cache(self, failed_studies):
        """Save failed studies to a JSON cache file."""
        cache_path = os.path.join(self.cache_dir, "failed_studies.json")
        with open(cache_path, 'w') as f:
            json.dump(failed_studies, f)
    
    def _load_failed_cache(self):
        """Load failed studies from a JSON cache file."""
        cache_path = os.path.join(self.cache_dir, "failed_studies.json")
        if os.path.exists(cache_path):
            with open(cache_path, 'r') as f:
                return json.load(f)
        return {}

    def tune(self, train_data, n_trials=200, use_stored=False):
        metrics = {}
        succesful_studies = {}
        failed_studies = {} if not use_stored else self._load_failed_cache()

        for location, group in train_data.group_by('location'):
            location = location[0]

            logger.info(f"Tuning {location}.")

            if location in failed_studies.keys():
                continue
            
            loc_str = self._make_writable_string(location)
            # Check if we can use stored results
            if use_stored:
                cached_study = self._load_study_cache(loc_str)
                if cached_study:
                    logger.info(f"Using stored hyperparameters for {location}.")
                    inner_dict = cached_study[location]
                    succesful_studies[location] = inner_dict['best_params']
                    metrics[location] = inner_dict['best_value']
                    continue
            
            X_train = group.drop("count").to_pandas()
            y_train = group["count"]
            study = optuna.create_study(direction="minimize")
            
            try:
                logger.info(f"Optimizing hyperparameters for {location}.")
                study.optimize(
                    lambda trial: self.objective(trial, X_train, y_train), 
                    n_trials=n_trials
                    )
                succesful_studies[location] = study.best_trial.params
                metrics[location] = study.best_trial.value

                self._save_study_cache(
                    loc_str, 
                    {
                        location:
                        {
                        'best_params': study.best_trial.params,
                        'best_value': study.best_trial.value
                        }
                    }
                    )
            except ValueError as e:
                failed_studies[location] = str(e)

        self.succesful_studies = succesful_studies
        self.metrics = metrics

        # find location with the min mape in the dict
        self.best_location = pd.Series(metrics).idxmin()
        best_params = succesful_studies[self.best_location]

        for location in failed_studies.keys():
            self.succesful_studies[location] = best_params
            self._save_study_cache(
                self._make_writable_string(location),
                {
                    location: {
                        'best_params': best_params,
                        'best_value': metrics[self.best_location]
                    }
                }
            )
        self.failed_studies = failed_studies
        self._save_failed_cache(failed_studies)


    def _refit_inner(self, train_data, location):
        X_train = train_data.drop("count").to_pandas()
        y_train = train_data["count"]

        pipe = make_pipeline(
            make_feature_transformer(X_train),
            XGBRegressor(
                **self.succesful_studies[location],
                **self.fixed_params
            )
        )
        return pipe.fit(X_train, y_train)

    def refit(self, train_data):
        # first refit the baseline model
        self._baseline_pipe = self._refit_inner(train_data, self.best_location)
        self._pipeline_dict = {}
        # fit location specific models
        for location, data in self.succesful_studies.items():
            logger.info(f"Refitting model for {location}.")
            self._pipeline_dict[location] = self._refit_inner(
                train_data=train_data.filter(pl.col("location") == location),
                location=location
            )
        self.is_fitted = True

    def check_fitted(self):
        if not self.is_fitted:
            raise ValueError("Model has not been fitted yet.")
        return True

    def predict(self, X_test, loc_str=""):
        self.check_fitted()
        # return the specific model when available, otherwise the baseline model
        # and make predictions
        return self._pipeline_dict.get(loc_str, self._baseline_pipe).predict(X_test)

    def eval(self, test_data):
        self.check_fitted()
        metrics_by_location = {}
        for location, group in test_data.group_by('location'):
            location = location[0]
            X_test = group.drop("count").to_pandas()
            y_test = group["count"]
            y_pred = self.predict(X_test, location)

            metrics_by_location[location] = {
                "rmse": np.round(root_mean_squared_error(y_test, y_pred)),
                "mape": np.round(mean_absolute_percentage_error(y_test, y_pred), 2)
            }

        return (
            pl.DataFrame(
                pd.DataFrame(metrics_by_location).T
                .reset_index()
                .rename(columns={"level_0": "location", "index": "location"})
            )
            .with_columns(
                pl.col("location").is_in(self.succesful_studies.keys()).not_().alias("baseline"),
                pl.col("location").cast(pl.Categorical).alias("location"),
                pl.col("rmse").cast(pl.Int64).alias("rmse"),
            )
            .join(
                test_data
                .group_by("location")
                .agg(pl.col("count").mean().cast(pl.Int64).alias("count"))
                .filter(pl.col("location").is_in(metrics_by_location.keys()))
                , on="location"
                )
            .sort("mape")
        )

    def get_pipeline(self, loc_str):
        return self._pipeline_dict.get(loc_str, self._baseline_pipe)
    
    def save(self, path="models/trained_ml_pipeline.pkl"):
        joblib.dump(self, path)

    def plot_predictions(self, data, loc_str):
        if not os.path.exists("images"):
            os.mkdir("images")

        self.check_fitted()
        X_test = data.filter(data["location"] == loc_str).drop("count").to_pandas()
        y_test = data.filter(data["location"] == loc_str)["count"].to_pandas()
        y_pred = self.predict(X_test, loc_str)

        combined = pd.DataFrame({"date": X_test["date"], "Actual": y_test, "Predicted": y_pred})

        fig = px.line(
            combined, x="date", y=["Actual", "Predicted"], 
            title=f"Counter {loc_str}", 
            template="simple_white")
        fig.update_xaxes(title_text='Date')
        fig.update_yaxes(title_text='Count')
        # fig.write_image(f"images/{loc_str}.png")
        fig.show()

As all necessary boilerplate code is already written in the BicyclePredictor class, we can now use it with very little code.

Code
import joblib
from model_class import SimpleModel

pickled_path = Path("trained_models/trained_ml_pipeline.pkl")
if pickled_path.exists():
    ml_pipeline = joblib.load(pickled_path)
else:
    ml_pipeline = SimpleModel()
    ml_pipeline.tune(train_data, n_trials=200, use_stored=True)
    ml_pipeline.refit(train_data)
    ml_pipeline.save()

Evaluation

Finally, we can evaluate the tuned pipeline of transformers and models on the evaluation data of the yet unseen year 2022.

Code
metrics_df = ml_pipeline.eval(test_data)

(
    GT(metrics_df)
    .tab_header("Metrics on Holdout Data")
    .cols_label(location="Location", rmse="RMSE", mape="MAPE", count="Avg. Monthly Count")
    .cols_move("count", after="location")
)
Metrics on Holdout Data
Location Avg. Monthly Count RMSE MAPE baseline
Neumarkt 126157 11469 0.08 false
Vorgebirgswall 92718 11920 0.09 false
Hohe Pforte 92102 11028 0.1 true
Hohenzollernbrücke 73095 11204 0.1 false
A.-Silbermann-Weg 83481 10751 0.11 false
Venloer Straße 167610 23240 0.11 false
Niederländer Ufer 72004 9990 0.11 false
Zülpicher Straße 148239 26699 0.15 false
Stadtwald 74314 15191 0.16 false
Neusser Straße 98155 18376 0.17 false
Deutzer Brücke 142528 35616 0.19 false
Bonner Straße 79392 18527 0.19 false
Severinsbrücke 48116 12039 0.22 false
Universitätsstraße 133047 35229 0.23 false
Vorgebirgspark 31012 10887 0.25 false
A.-Schütte-Allee 61827 19031 0.3 false
Rodenkirchener Brücke 52069 18055 0.37 false

Plot the Predictions vs Actual Counts

First, let’s start with some location that did really bad:

Code
ml_pipeline.plot_predictions(test_data, "Bonner Straße")
ml_pipeline.plot_predictions(test_data, "Severinsbrücke")

While our model for Severinsbrücke seems to show the seasonal pattern, it is not able to predict the peaks with the correct magnitude. The model for Bonner Straße is not able to capture the trend at all. Let’s have a look at the best location, on the other hand:

Code
ml_pipeline.plot_predictions(test_data, "Zülpicher Straße")
ml_pipeline.plot_predictions(test_data, "Hohe Pforte")
ml_pipeline.plot_predictions(test_data, "Venloer Straße")

Extension: Prediction Intervals

To quantify the uncertainty in our predictions, we can use conformal prediction intervals. As a formal definition for general regression problems let’s assume that our training data \((X,Y)=[(x_1,y_1),\dots,(x_n,y_n)]\) is drawn i.i.d. from an unknown \(P_{X,Y}\).Then the regression task is defined by \(Y=\mu(X)+\epsilon\) with our model \(\mu\) and the residuals \(\epsilon_i ~ P_{Y|X}\). We can define a nominal coverage level for the prediction intervals \(1-\alpha\). So we want to get an interval \(\hat{C}_{n,a}\) for each new prediction on the unseen feature vector \(X_{n+1}\) such that $\(P[Y_{n+1}\in \hat{C}_{n,a}(X_{n+1})]\ge 1-\alpha\) . We will use the MAPIE package to get performance intervals that are model agnostic. This is great, because we can plug in our existing pipeline and get the intervals with just a bit of wrapping. Now, there are different approaches to constructing prediction intervals. But since we are dealing with a timeseries, that violates the i.i.d assumption, we need to use ensemble batch prediction intervals (EnbPI). Read more about that here.

We will define a new class QuantileModels that inherits from BicyclePredictor and extends it with the necessary methods to fit the MapieTimeSeriesRegressorand predict the intervals. We will use the BlockBootstrap method to resample the data and get the intervals. We will also define a method to plot the intervals.

This code combines some of the examples here and here.

Code
class MAPIEModel(SimpleModel):
    def set_succesful_studies(self, succesful_studies):
        self.succesful_studies = succesful_studies
    
    def set_best_location(self, best_location):
        self.best_location = best_location

    def _fit_mapie(self, train_data, location, n_resamplings=100, n_blocks=3):
        X_train = train_data.drop("count").filter(train_data["location"] == location).to_pandas()
        y_train = train_data.filter(train_data["location"] == location)["count"]

        pipe = make_pipeline(
                make_feature_transformer(X_train),
                XGBRegressor(
                    **self.succesful_studies[location],
                    **self.fixed_params
                )
            )

        cv_mapietimeseries = BlockBootstrap(
            n_resamplings=n_resamplings, n_blocks=n_blocks, overlapping=False, random_state=42
        )
        mapie = MapieTimeSeriesRegressor(
            pipe,
            method="enbpi",
            cv=cv_mapietimeseries,
            agg_function="mean",
            n_jobs=-1,
        )
        return mapie.fit(X_train, y_train)

    def fit_mapie(self, train_data, n_resamplings=100, n_blocks=3):
        self.refit(train_data)
        self._baseline_mapie = self._fit_mapie(train_data, location=self.best_location)

        self.mapie_models = {}
        for location in self.succesful_studies.keys():
            logger.info(f"Fitting MAPIE model for {location}.")
            self.mapie_models[location] = self._fit_mapie(train_data, location=location, n_resamplings=n_resamplings, n_blocks=n_blocks)


        self.is_mapie_fitted = True

    def check_mapie_fitted(self):
        if not hasattr(self, "is_mapie_fitted") or not self.is_mapie_fitted:
            raise ValueError("Model has not been fitted yet.")
        return True

    def predict_mapie(self, X_test, loc_str="", alpha=0.05):
        self.check_mapie_fitted()

        mapie = self.mapie_models.get(loc_str, self._baseline_mapie)
        y_pred, y_pis = mapie.predict(X_test, alpha=alpha)

        return y_pred, y_pis

    def print_mapie_metrics(self, y_test, y_pis):
        coverage = regression_coverage_score(y_test, y_pis[:, 0, 0], y_pis[:, 1, 0])
        width = regression_mean_width_score(y_pis[:, 0, 0], y_pis[:, 1, 0])
        print(
            "Coverage and prediction interval width mean for CV+: "
            f"{coverage:.3f}, {width:.3f}"
        )

    def save(self, path="models/trained_quantile_pipeline.pkl"):
        joblib.dump(self, path)

    def plot_mapie(self, data, loc_str, alpha=0.05):
        import plotly.graph_objects as go
        self.check_mapie_fitted()

        X_test = data.filter(data["location"] == loc_str).drop("count").to_pandas()
        y_test = data.filter(data["location"] == loc_str)["count"].to_pandas()
        y_pred, y_pis = self.predict_mapie(X_test, loc_str=loc_str, alpha=alpha)

        fig = go.Figure([
            go.Scatter(
                name=f'{(1 - alpha)*100}% PI',
                x=X_test['date'],
                y=y_pis[:, 1, 0],
                mode='lines',
                marker=dict(color="#444"),
                line=dict(width=0),
                showlegend=False
            ),
            go.Scatter(
                name=f'{(1 - alpha)*100}% PI',
                x=X_test['date'],
                y=y_pis[:, 0, 0],
                marker=dict(color="#444"),
                line=dict(width=0),
                mode='lines',
                fillcolor='rgba(68, 68, 68, 0.3)',
                fill='tonexty',
                showlegend=True
            ),
            go.Scatter(
                x=X_test["date"],
                y=y_test,
                name="Actual",
                line=dict(color='rgb(31, 119, 180)'),
                mode='lines'
            ),
            go.Scatter(
                x=X_test["date"],
                y=y_pred,
                name="Predicted",
                line=dict(color='rgb(255, 127, 14)'),
                mode='lines'
            )
        ])
        fig.update_layout(
            title=dict(text=f"Prediction Intervals for {loc_str}"),
            hovermode="x",
            template="simple_white"
            )
        fig.update_xaxes(title_text='Date')
        fig.update_yaxes(title_text='Count')
        fig.show()

Now, we can instantiate the MAPIE model and set some necessary attributes. We are basically transferring some optimization results to the new instance to not tune the models again. Instead, we are just fitting the underlying MAPIE time series models.

Code
import joblib
from model_class import MAPIEModel

loc_str = "Venloer Straße"
pickled_path = Path("trained_models/trained_quantile_pipeline.pkl")

if pickled_path.exists():
    quant_pipeline = joblib.load(pickled_path)
else:
    quant_pipeline = joblib.load(pickled_path)
    quant_pipeline = MAPIEModel()
    quant_pipeline.set_best_location(loc_str)
    quant_pipeline.set_succesful_studies(ml_pipeline.succesful_studies)
    quant_pipeline.fit_mapie(train_data)
    quant_pipeline.save()

Now let’s plot our new prediction intervals and their mean predictions for the location “Venloer Straße” for the test year 2022.

Code
def plot_external_args(pred_class, data, loc_str, alpha=0.05):
    X_test = data.filter(data["location"] == loc_str).drop("count").to_pandas()
    y_test = data.filter(data["location"] == loc_str)["count"].to_pandas()
    y_pred, y_pis = pred_class.predict_mapie(X_test, loc_str=loc_str, alpha=alpha)

    fig = go.Figure([
        go.Scatter(
            name=f'{(1 - alpha)*100}% PI',
            x=X_test['date'],
            y=y_pis[:, 1, 0],
            mode='lines',
            marker=dict(color="#444"),
            line=dict(width=0),
            showlegend=False
        ),
        go.Scatter(
            name=f'{(1 - alpha)*100}% PI',
            x=X_test['date'],
            y=y_pis[:, 0, 0],
            marker=dict(color="#444"),
            line=dict(width=0),
            mode='lines',
            fillcolor='rgba(68, 68, 68, 0.3)',
            fill='tonexty',
            showlegend=True
        ),
        go.Scatter(
            x=X_test["date"],
            y=y_test,
            name="Actual",
            line=dict(color='rgb(31, 119, 180)'),
            mode='lines'
        ),
        go.Scatter(
            x=X_test["date"],
            y=y_pred,
            name="Predicted",
            line=dict(color='rgb(255, 127, 14)'),
            mode='lines'
        )
    ])
    fig.update_layout(
        title=dict(text=f"Prediction Intervals for {loc_str}"),
        hovermode="x",
        template="simple_white",
        font_family="Times New Roman",
        font_size=16,
        width=800,
        height=300
        )
    fig.update_xaxes(title_text='Date')
    fig.update_yaxes(title_text='Count')
    return fig


# quant_pipeline.plot_mapie(test_data, loc_str, alpha=0.05)
fig = plot_external_args(quant_pipeline, test_data, loc_str, alpha=0.05)
fig

The model is fairly uncertain, what the lower bound of bicycles counted is in the unseen time range. The upper bound at least includes all the actual counts. This is a good sign, but the model might be too conservative.

Extension for Real-Time Predictions

An extension to this project allowing real-time predictions is described in this follow-up post. Here, I’ll use the Flask library to create a REST API that makes predictions using our pickled models. We will also use real-time weather data from the tomorrow.io API to get a inference feature vector.